package net.apexes.commons.lang;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * 异步并行处理器。将要处理的数据加入到阻塞队列中，启动工作线程从队列中获取数据并处理。
 *
 * @author <a href=mailto:hedyn@foxmail.com>HeDYn</a>
 */
public class AsyncExecutor<E> {

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private int queueCapacity;
        private int parallelism;
        private AsyncExecutor.ErrorMonitor errorMonitor;
        private ThreadFactory threadFactory;

        private Builder() {
        }

        /**
         * 设置待处理队列的最大容量。
         * @param queueCapacity 待处理队列的最大容量。
         * @return
         */
        public Builder queueCapacity(int queueCapacity) {
            this.queueCapacity = queueCapacity;
            return this;
        }

        /**
         * 设置并行处理线程数。
         * @param parallelism 并行处理的线程数。
         * @return
         */
        public Builder parallelism(int parallelism) {
            this.parallelism = parallelism;
            return this;
        }

        /**
         * 设置处理线程异常监控器。
         * @param errorMonitor 异常监控器，用于监控处理线程的异常。
         * @return
         */
        public Builder errorMonitor(ErrorMonitor errorMonitor) {
            this.errorMonitor = errorMonitor;
            return this;
        }

        /**
         * 设置处理线程的线程工厂。
         * @param threadFactory 处理线程的线程工厂。
         * @return
         */
        public Builder threadFactory(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }

        public <E> AsyncExecutor<E> build(Worker<E> worker) {
            return new AsyncExecutor<>(queueCapacity, parallelism, worker, errorMonitor, threadFactory);
        }
    }

    private static final int MAX_CAP = 0x7fff;  // max #workers - 1

    private final int queueCapacity;
    private final int parallelism;
    private final AsyncExecutor.Worker<E> worker;
    private final AsyncExecutor.ErrorMonitor errorMonitor;
    private final ThreadFactory threadFactory;

    private volatile boolean running = false;
    private volatile boolean closing = false;
    private BlockingQueue<E> queue;
    private ExecutorService executorService;

    /**
     * 创建一个异步并行处理器实例。
     * @param worker 数据的处理者实例。
     */
    public AsyncExecutor(Worker<E> worker) {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), worker);
    }

    /**
     * 创建一个异步并行处理器实例。
     * @param parallelism 并行处理的线程数。
     * @param worker 数据的处理者实例。
     */
    public AsyncExecutor(int parallelism, Worker<E> worker) {
        this(Integer.MAX_VALUE, parallelism, worker);
    }

    /**
     * 创建一个异步并行处理器实例。
     * @param queueCapacity 待处理队列的最大容量
     * @param parallelism 并行处理的线程数。
     * @param worker 数据的处理者实例。
     */
    public AsyncExecutor(int queueCapacity, int parallelism, Worker<E> worker) {
        this(queueCapacity, parallelism, worker, null);
    }

    /**
     * 创建一个异步并行处理器实例。
     * @param queueCapacity 待处理队列的最大容量
     * @param parallelism 并行处理的线程数。
     * @param worker 数据的处理者实例。
     * @param monitor 异常监控器，用于监控处理线程的异常。
     */
    public AsyncExecutor(int queueCapacity, int parallelism, Worker<E> worker, ErrorMonitor monitor) {
        this(queueCapacity, parallelism, worker, monitor, null);
    }

    /**
     * 创建一个异步并行处理器实例。
     * @param queueCapacity 待处理队列的最大容量
     * @param parallelism 并行处理的线程数。
     * @param worker 数据的处理者实例。
     * @param monitor 异常监控器，用于监控处理线程的异常。
     * @param threadFactory 处理线程的线程工厂。
     */
    public AsyncExecutor(int queueCapacity, int parallelism, Worker<E> worker, ErrorMonitor monitor, ThreadFactory threadFactory) {
        Checks.verifyNotNull(worker, "worker");
        this.queueCapacity = checkQueueCapacity(queueCapacity);
        this.parallelism = checkParallelism(parallelism);
        this.worker = worker;
        this.errorMonitor = monitor;
        if (threadFactory == null) {
            this.threadFactory = NamedThreadFactory.pool("async-worker-");
        } else {
            this.threadFactory = threadFactory;
        }
    }

    private int checkQueueCapacity(int queueCapacity) {
        if (queueCapacity <= 0) {
            throw new IllegalArgumentException("queueCapacity = " + queueCapacity);
        }
        return queueCapacity;
    }

    private int checkParallelism(int parallelism) {
        if (parallelism <= 0 || parallelism > MAX_CAP) {
            throw new IllegalArgumentException("parallelism = " + parallelism);
        }
        return parallelism;
    }

    /**
     * 启动处理，调用此方法后要调用 {@link #close()} 方法释放资源。
     */
    public synchronized void start() {
        if (running) {
            throw new IllegalStateException("already running.");
        }
        closing = false;
        running = true;
        queue = new LinkedBlockingQueue<>(queueCapacity);
        executorService = Executors.newFixedThreadPool(parallelism, threadFactory);
        for (int i = 0; i < parallelism; i++) {
            executorService.execute(new Task());
        }
    }

    /**
     * 停止处理并释放资源。
     */
    public synchronized void close() {
        closing = true;
        if (executorService != null) {
            executorService.shutdownNow();
            executorService = null;
        }
        if (queue != null) {
            queue.clear();
        }
        running = false;
    }

    /**
     * 将指定元素加入到处理队列中（如果立即可行且不会违反容量限制），成功时返回 true，如果当前没有可用的空间，则返回 false。
     * @param data 要处理的元素
     * @return 如果该元素已添加到处理队列，则返回 true；否则返回 false
     */
    public boolean offer(E data) {
        if (closing) {
            throw new IllegalStateException("already closing.");
        }
        return queue.offer(data);
    }

    /**
     * 将指定元素插入处理队列中，在到达指定的等待时间前等待可用的空间（如果有必要）
     * @param data 要处理的元素
     * @param timeout 放弃之前等待的时间长度，以 unit 为时间单位
     * @param unit 确定如何解释 timeout 参数的 TimeUnit
     * @return 如果成功，则返回 true；如果在空间可用前超过了指定的等待时间，则返回 false
     * @throws InterruptedException 如果在等待时被中断
     */
    public boolean offer(E data, long timeout, TimeUnit unit) throws InterruptedException {
        if (closing) {
            throw new IllegalStateException("already closing.");
        }
        return queue.offer(data, timeout, unit);
    }

    /**
     * 获取当前处理队列的元素个数
     * @return
     */
    public int getQueueSize() {
        if (queue == null) {
            return 0;
        }
        return queue.size();
    }

    /**
     * 判断此处理器是否处于激活状态。
     * @return 如果处于激活状态返回 true，否则返回 false
     */
    public boolean isActivated() {
        return running && !closing;
    }

    private class Task implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    execute(queue.take());
                    if (closing) {
                        break;
                    }
                } catch (InterruptedException e) {
                    if (closing) {
                        break;
                    } else if (errorMonitor != null) {
                        errorMonitor.onError(e);
                    }
                }
            }
        }

        private void execute(E data) {
            try {
                worker.execute(data);
            } catch (Exception e) {
                if (errorMonitor != null) {
                    errorMonitor.onError(e);
                }
            }
        }
    }

    /**
     * 数据处理者，执行对待处理数据的处理工作。
     * @param <E>
     */
    public interface Worker<E> {
        void execute(E data);
    }

    /**
     * 处理线程异常的监控器。
     */
    public interface ErrorMonitor {
        void onError(Exception exception);
    }

}
